From b54adab7e2c2dc096cef5d059ea17364241406c9 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 10 Nov 2022 15:23:46 -0700 Subject: [PATCH 1/8] Add drop function to the transform processor --- .chloggen/tp-drop.yaml | 16 ++ processor/transformprocessor/README.md | 59 +++++- .../internal/common/func_drop.go | 27 +++ .../internal/common/func_drop_test.go | 30 ++++ .../internal/common/functions.go | 1 + .../internal/common/logs.go | 33 ++-- .../internal/common/metrics.go | 169 ++++++++---------- .../internal/common/processor.go | 142 ++++++++------- .../internal/common/traces.go | 72 ++++---- .../internal/logs/processor_test.go | 59 ++++++ .../internal/metrics/processor_test.go | 63 +++++++ .../internal/traces/processor_test.go | 63 +++++++ 12 files changed, 522 insertions(+), 212 deletions(-) create mode 100755 .chloggen/tp-drop.yaml create mode 100644 processor/transformprocessor/internal/common/func_drop.go create mode 100644 processor/transformprocessor/internal/common/func_drop_test.go diff --git a/.chloggen/tp-drop.yaml b/.chloggen/tp-drop.yaml new file mode 100755 index 000000000000..8a79e45a3d91 --- /dev/null +++ b/.chloggen/tp-drop.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds a `drop` function to the transform processor that allows dropping of telemetry. + +# One or more tracking issues related to the change +issues: [16297] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index f9f77dce05f7..a609b3fcd4d6 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -12,6 +12,15 @@ The transform processor modifies telemetry based on configuration using the [Ope For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config. Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. +**Tables of Contents** +- [Config](#config) +- [Example](#example) +- [Grammar](#grammar) +- [Contexts](#contexts) +- [Supported Functions](#supported-functions) +- [Contributing](#contributing) +- [Warnings](#warnings) + ## Config The transform processor allows configuring multiple context statements for traces, metrics, and logs. @@ -105,11 +114,11 @@ The contexts allow the OTTL to interact with the underlying telemetry data in it - [Resource Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlresource) - [Scope Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlscope) -- [Span Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspan) +- [Span Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspan) - [SpanEvent Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspanevent) - [Metric Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlmetric) -- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoint) -- [Log Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog) +- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoint) +- [Log Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog) Each context allows transformation of its type of telemetry. For example, statements associated to a `resource` context will be able to transform the resource's `attributes` and `dropped_attributes_count`. @@ -152,17 +161,55 @@ This is because contexts are nested: the efficiency comes because higher-level c ## Supported functions: Since the transform processor utilizes the OTTL's contexts for Traces, Metrics, and Logs, it is able to utilize functions that expect pdata in addition to any common functions. These common functions can be used for any signal. - + - [OTTL Functions](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/ottlfuncs) In addition to OTTL functions, the processor defines its own functions to help with transformations specific to this processor: +**Common functions** +- [drop](#drop) + **Metrics only functions** - [convert_sum_to_gauge](#convert_sum_to_gauge) - [convert_gauge_to_sum](#convert_gauge_to_sum) - [convert_summary_count_val_to_sum](#convert_summary_count_val_to_sum) - [convert_summary_sum_val_to_sum](#convert_summary_sum_val_to_sum) +## drop + +`drop()` + +Drops the specified [Context's](#contexts) telemetry from the collector. Best used with a condition, otherwise all the Context's telemetry would be dropped. + +If a Metric ever has an empty DataPoints slice, the metric will also be dropped. +If a Scope ever has an empty Span/Metric/Log slice the scope will also be dropped. +If a Resource ever has an empty Scope slice the resource will also be dropped. +An empty SpanEvents slice will not cause a Span to be dropped. + +**Be very careful when dropping telemetry**. Dropping telemetry can result in [unsound transformations or orphaned telemetry](#warnings). +Drop does not attempt to reconcile any issues such as orphaned spans/logs or re-aggregation of metric datapoints. + +Examples: +```yaml +- context: datapoint + statements: + # drops any datapoint that has an attribute named "test" with a value of "pass". + - drop() where attributes["test"] == "pass" +``` +```yaml +- context: resource + statements: + # drops any resource (and therefore all its scopes and spans/metrics/logs) + # that has an attribute named "test" with a value of "pass". + - drop() where attributes["test"] == "pass" +``` +```yaml +- context: spanevent + statements: + # drops any span event that has a name of "drop-me" + - drop() where name == "drop-me" +``` + ## convert_sum_to_gauge `convert_sum_to_gauge()` @@ -241,8 +288,8 @@ The transform processor's implementation of the [OpenTelemetry Transformation La - [Unsound Transformations](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#unsound-transformations): Several Metric-only functions allow you to transform one metric data type to another or create new metrics from an existing metrics. Transformations between metric data types are not defined in the [metrics data model](https://github.com/open-telemetry/opentelemetry-specification/blob/main//specification/metrics/data-model.md). These functions have the expectation that you understand the incoming data and know that it can be meaningfully converted to a new metric data type or can meaningfully be used to create new metrics. - Although the OTTL allows the `set` function to be used with `metric.data_type`, its implementation in the transform processor is NOOP. To modify a data type you must use a function specific to that purpose. -- [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing existing attributes. Adding new attributes is safe. -- [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs. +- [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name, when reducing/changing existing attributes, and when dropping datapoints. Adding new attributes is safe. +- [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. It also allows dropping spans and logs. Modifying these fields or dropping spans/logs could lead to orphaned spans or logs. [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/processor/transformprocessor/internal/common/func_drop.go b/processor/transformprocessor/internal/common/func_drop.go new file mode 100644 index 000000000000..40e123198a1b --- /dev/null +++ b/processor/transformprocessor/internal/common/func_drop.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +func drop[K any]() (ottl.ExprFunc[K], error) { + return func(ctx context.Context, tCtx K) (interface{}, error) { + return shouldRemove(true), nil + }, nil +} diff --git a/processor/transformprocessor/internal/common/func_drop_test.go b/processor/transformprocessor/internal/common/func_drop_test.go new file mode 100644 index 000000000000..d3d8eec7056a --- /dev/null +++ b/processor/transformprocessor/internal/common/func_drop_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_drop(t *testing.T) { + exprFunc, err := drop[any]() + assert.NoError(t, err) + result, err := exprFunc(context.Background(), nil) + assert.NoError(t, err) + assert.Equal(t, shouldRemove(true), result) +} diff --git a/processor/transformprocessor/internal/common/functions.go b/processor/transformprocessor/internal/common/functions.go index 7cf74a0e3652..0fd0863d3417 100644 --- a/processor/transformprocessor/internal/common/functions.go +++ b/processor/transformprocessor/internal/common/functions.go @@ -39,6 +39,7 @@ func Functions[K any]() map[string]interface{} { "replace_all_patterns": ottlfuncs.ReplaceAllPatterns[K], "delete_key": ottlfuncs.DeleteKey[K], "delete_matching_keys": ottlfuncs.DeleteMatchingKeys[K], + "drop": drop[K], } } diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 8267b2dcbe4d..c8a943701b1f 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" @@ -38,23 +39,23 @@ func (l logStatements) Capabilities() consumer.Capabilities { } func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - for i := 0; i < ld.ResourceLogs().Len(); i++ { - rlogs := ld.ResourceLogs().At(i) - for j := 0; j < rlogs.ScopeLogs().Len(); j++ { - slogs := rlogs.ScopeLogs().At(j) - logs := slogs.LogRecords() - for k := 0; k < logs.Len(); k++ { - tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) - for _, statement := range l { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + var errors error + ld.ResourceLogs().RemoveIf(func(rlogs plog.ResourceLogs) bool { + rlogs.ScopeLogs().RemoveIf(func(slogs plog.ScopeLogs) bool { + slogs.LogRecords().RemoveIf(func(logRecord plog.LogRecord) bool { + tCtx := ottllog.NewTransformContext(logRecord, slogs.Scope(), rlogs.Resource()) + remove, err := executeStatements(ctx, tCtx, l) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - } - return nil + return bool(remove) + }) + return slogs.LogRecords().Len() == 0 + }) + return rlogs.ScopeLogs().Len() == 0 + }) + return errors } type LogParserCollection struct { diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 097818dffb51..8e6b71c9ef92 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -19,8 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" @@ -40,23 +40,23 @@ func (m metricStatements) Capabilities() consumer.Capabilities { } func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) - for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { - smetrics := rmetrics.ScopeMetrics().At(j) - metrics := smetrics.Metrics() - for k := 0; k < metrics.Len(); k++ { - tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Scope(), rmetrics.Resource()) - for _, statement := range m { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + var errors error + md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool { + rmetrics.ScopeMetrics().RemoveIf(func(smetrics pmetric.ScopeMetrics) bool { + smetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool { + tCtx := ottlmetric.NewTransformContext(metric, smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, m) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - } - return nil + return bool(remove) + }) + return smetrics.Metrics().Len() == 0 + }) + return rmetrics.ScopeMetrics().Len() == 0 + }) + return errors } var _ consumer.Metrics = &dataPointStatements{} @@ -70,87 +70,74 @@ func (d dataPointStatements) Capabilities() consumer.Capabilities { } func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) - for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { - smetrics := rmetrics.ScopeMetrics().At(j) - metrics := smetrics.Metrics() - for k := 0; k < metrics.Len(); k++ { - metric := metrics.At(k) - var err error + var errors error + md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool { + rmetrics.ScopeMetrics().RemoveIf(func(smetrics pmetric.ScopeMetrics) bool { + smetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool { switch metric.Type() { case pmetric.MetricTypeSum: - err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + metric.Sum().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return metric.Sum().DataPoints().Len() == 0 case pmetric.MetricTypeGauge: - err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + metric.Gauge().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return metric.Gauge().DataPoints().Len() == 0 case pmetric.MetricTypeHistogram: - err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + metric.Histogram().DataPoints().RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return metric.Histogram().DataPoints().Len() == 0 case pmetric.MetricTypeExponentialHistogram: - err = d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + metric.ExponentialHistogram().DataPoints().RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return metric.ExponentialHistogram().DataPoints().Len() == 0 case pmetric.MetricTypeSummary: - err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) - } - if err != nil { - return err + metric.Summary().DataPoints().RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return metric.Summary().DataPoints().Len() == 0 } - } - } - } - return nil -} - -func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { - for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.callFunctions(ctx, tCtx) - if err != nil { - return err - } - } - return nil -} - -func (d dataPointStatements) callFunctions(ctx context.Context, tCtx ottldatapoint.TransformContext) error { - for _, statement := range d { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } - } - return nil + return false + }) + return smetrics.Metrics().Len() == 0 + }) + return rmetrics.ScopeMetrics().Len() == 0 + }) + return errors } type MetricParserCollection struct { diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index 9d49fb45ddb4..5c371b29abcb 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -23,12 +23,28 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) +type shouldRemove bool + +func executeStatements[K any](ctx context.Context, tCtx K, statements []*ottl.Statement[K]) (shouldRemove, error) { + for _, statement := range statements { + value, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return false, err + } + if remove, ok := value.(shouldRemove); ok && bool(remove) { + return true, nil + } + } + return false, nil +} + var _ consumer.Traces = &resourceStatements{} var _ consumer.Metrics = &resourceStatements{} var _ consumer.Logs = &resourceStatements{} @@ -43,45 +59,45 @@ func (r resourceStatements) Capabilities() consumer.Capabilities { } func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) + var errors error + td.ResourceSpans().RemoveIf(func(rspans ptrace.ResourceSpans) bool { tCtx := ottlresource.NewTransformContext(rspans.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, r) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - return nil + return bool(remove) + }) + return errors } func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) + var errors error + md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool { tCtx := ottlresource.NewTransformContext(rmetrics.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, r) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - return nil + return bool(remove) + }) + return errors } func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - for i := 0; i < ld.ResourceLogs().Len(); i++ { - rlogs := ld.ResourceLogs().At(i) + var errors error + ld.ResourceLogs().RemoveIf(func(rlogs plog.ResourceLogs) bool { tCtx := ottlresource.NewTransformContext(rlogs.Resource()) - for _, statement := range r { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, r) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - return nil + return bool(remove) + }) + return errors } var _ consumer.Traces = &scopeStatements{} @@ -98,54 +114,54 @@ func (s scopeStatements) Capabilities() consumer.Capabilities { } func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - for j := 0; j < rspans.ScopeSpans().Len(); j++ { - sspans := rspans.ScopeSpans().At(j) + var errors error + td.ResourceSpans().RemoveIf(func(rspans ptrace.ResourceSpans) bool { + rspans.ScopeSpans().RemoveIf(func(sspans ptrace.ScopeSpans) bool { tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, s) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - return nil + return bool(remove) + }) + return rspans.ScopeSpans().Len() == 0 + }) + return errors } func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) - for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { - smetrics := rmetrics.ScopeMetrics().At(j) + var errors error + md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool { + rmetrics.ScopeMetrics().RemoveIf(func(smetrics pmetric.ScopeMetrics) bool { tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, s) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - return nil + return bool(remove) + }) + return rmetrics.ScopeMetrics().Len() == 0 + }) + return errors } func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - for i := 0; i < ld.ResourceLogs().Len(); i++ { - rlogs := ld.ResourceLogs().At(i) - for j := 0; j < rlogs.ScopeLogs().Len(); j++ { - slogs := rlogs.ScopeLogs().At(j) + var errors error + ld.ResourceLogs().RemoveIf(func(rlogs plog.ResourceLogs) bool { + rlogs.ScopeLogs().RemoveIf(func(slogs plog.ScopeLogs) bool { tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + remove, err := executeStatements(ctx, tCtx, s) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - return nil + return bool(remove) + }) + return rlogs.ScopeLogs().Len() == 0 + }) + return errors } type parserCollection struct { diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 9a5c4b7fb927..7eaa0fa5f292 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" @@ -39,23 +40,23 @@ func (t traceStatements) Capabilities() consumer.Capabilities { } func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - for j := 0; j < rspans.ScopeSpans().Len(); j++ { - sspans := rspans.ScopeSpans().At(j) - spans := sspans.Spans() - for k := 0; k < spans.Len(); k++ { - tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource()) - for _, statement := range t { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + var errors error + td.ResourceSpans().RemoveIf(func(rspans ptrace.ResourceSpans) bool { + rspans.ScopeSpans().RemoveIf(func(sspans ptrace.ScopeSpans) bool { + sspans.Spans().RemoveIf(func(span ptrace.Span) bool { + tCtx := ottlspan.NewTransformContext(span, sspans.Scope(), rspans.Resource()) + remove, err := executeStatements(ctx, tCtx, t) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - } - return nil + return bool(remove) + }) + return sspans.Spans().Len() == 0 + }) + return rspans.ScopeSpans().Len() == 0 + }) + return errors } var _ consumer.Traces = &spanEventStatements{} @@ -69,27 +70,26 @@ func (s spanEventStatements) Capabilities() consumer.Capabilities { } func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - for j := 0; j < rspans.ScopeSpans().Len(); j++ { - sspans := rspans.ScopeSpans().At(j) - spans := sspans.Spans() - for k := 0; k < spans.Len(); k++ { - span := spans.At(k) - spanEvents := span.Events() - for n := 0; n < spanEvents.Len(); n++ { - tCtx := ottlspanevent.NewTransformContext(spanEvents.At(k), span, sspans.Scope(), rspans.Resource()) - for _, statement := range s { - _, _, err := statement.Execute(ctx, tCtx) - if err != nil { - return err - } + var errors error + td.ResourceSpans().RemoveIf(func(rspans ptrace.ResourceSpans) bool { + rspans.ScopeSpans().RemoveIf(func(sspans ptrace.ScopeSpans) bool { + sspans.Spans().RemoveIf(func(span ptrace.Span) bool { + span.Events().RemoveIf(func(spanEvent ptrace.SpanEvent) bool { + tCtx := ottlspanevent.NewTransformContext(spanEvent, span, sspans.Scope(), rspans.Resource()) + remove, err := executeStatements(ctx, tCtx, s) + if err != nil { + errors = multierr.Append(errors, err) + return false } - } - } - } - } - return nil + return bool(remove) + }) + return false + }) + return false + }) + return false + }) + return errors } type TraceParserCollection struct { diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index f33be8d7905a..d2e51c3ba5bd 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -54,6 +54,17 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) { want: func(td plog.Logs) { }, }, + { + statement: `drop()`, + want: func(td plog.Logs) { + empty := plog.NewResourceLogsSlice() + empty.CopyTo(td.ResourceLogs()) + }, + }, + { + statement: `drop() where dropped_attributes_count == 100`, + want: func(td plog.Logs) {}, + }, } for _, tt := range tests { @@ -89,6 +100,17 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) { want: func(td plog.Logs) { }, }, + { + statement: `drop()`, + want: func(td plog.Logs) { + empty := plog.NewResourceLogsSlice() + empty.CopyTo(td.ResourceLogs()) + }, + }, + { + statement: `drop() where name != "scope"`, + want: func(td plog.Logs) {}, + }, } for _, tt := range tests { @@ -289,6 +311,21 @@ func Test_ProcessLogs_LogContext(t *testing.T) { td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "OperationA") }, }, + { + statement: `drop()`, + want: func(td plog.Logs) { + empty := plog.NewResourceLogsSlice() + empty.CopyTo(td.ResourceLogs()) + }, + }, + { + statement: `drop() where body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().RemoveIf(func(logRecord plog.LogRecord) bool { + return logRecord.Body().AsString() == "operationA" + }) + }, + }, } for _, tt := range tests { @@ -406,6 +443,28 @@ func Test_ProcessLogs_MixContext(t *testing.T) { td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") }, }, + { + name: "drop stops statement execution", + contextStatments: []common.ContextStatements{ + { + Context: "log", + Statements: []string{ + `drop() where instrumentation_scope.name == "scope"`, + `set(attributes["test"], "pass")`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td plog.Logs) { + empty := plog.NewResourceLogsSlice() + empty.CopyTo(td.ResourceLogs()) + }, + }, } for _, tt := range tests { diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 26c4359d6d7e..c69097dfc0cb 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -50,6 +50,17 @@ func Test_ProcessMetrics_ResourceContext(t *testing.T) { want: func(td pmetric.Metrics) { }, }, + { + statement: `drop()`, + want: func(td pmetric.Metrics) { + empty := pmetric.NewResourceMetricsSlice() + empty.CopyTo(td.ResourceMetrics()) + }, + }, + { + statement: `drop() where dropped_attributes_count == 100`, + want: func(td pmetric.Metrics) {}, + }, } for _, tt := range tests { @@ -85,6 +96,17 @@ func Test_ProcessMetrics_ScopeContext(t *testing.T) { want: func(td pmetric.Metrics) { }, }, + { + statement: `drop()`, + want: func(td pmetric.Metrics) { + empty := pmetric.NewResourceMetricsSlice() + empty.CopyTo(td.ResourceMetrics()) + }, + }, + { + statement: `drop() where name != "scope"`, + want: func(td pmetric.Metrics) {}, + }, } for _, tt := range tests { @@ -456,6 +478,25 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test_camel", "OperationA") }, }, + { + statements: []string{ + `drop()`, + }, + want: func(td pmetric.Metrics) { + empty := pmetric.NewResourceMetricsSlice() + empty.CopyTo(td.ResourceMetrics()) + }, + }, + { + statements: []string{ + `drop() where metric.type == METRIC_DATA_TYPE_EXPONENTIAL_HISTOGRAM`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + return metric.Type() == pmetric.MetricTypeExponentialHistogram + }) + }, + }, } for _, tt := range tests { @@ -588,6 +629,28 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") }, }, + { + name: "drop stops statement execution", + contextStatments: []common.ContextStatements{ + { + Context: "metric", + Statements: []string{ + `drop() where instrumentation_scope.name == "scope"`, + `set(name, "pass")`, + }, + }, + { + Context: "scope", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td pmetric.Metrics) { + empty := pmetric.NewResourceMetricsSlice() + empty.CopyTo(td.ResourceMetrics()) + }, + }, } for _, tt := range tests { diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 31d5d106321d..dbf55e08cb44 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -55,6 +55,17 @@ func Test_ProcessTraces_ResourceContext(t *testing.T) { want: func(td ptrace.Traces) { }, }, + { + statement: `drop()`, + want: func(td ptrace.Traces) { + empty := ptrace.NewResourceSpansSlice() + empty.CopyTo(td.ResourceSpans()) + }, + }, + { + statement: `drop() where dropped_attributes_count == 100`, + want: func(td ptrace.Traces) {}, + }, } for _, tt := range tests { @@ -90,6 +101,17 @@ func Test_ProcessTraces_ScopeContext(t *testing.T) { want: func(td ptrace.Traces) { }, }, + { + statement: `drop()`, + want: func(td ptrace.Traces) { + empty := ptrace.NewResourceSpansSlice() + empty.CopyTo(td.ResourceSpans()) + }, + }, + { + statement: `drop() where name != "scope"`, + want: func(td ptrace.Traces) {}, + }, } for _, tt := range tests { @@ -333,6 +355,21 @@ func Test_ProcessTraces_TraceContext(t *testing.T) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "OperationA") }, }, + { + statement: `drop()`, + want: func(td ptrace.Traces) { + empty := ptrace.NewResourceSpansSlice() + empty.CopyTo(td.ResourceSpans()) + }, + }, + { + statement: `drop() where name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().RemoveIf(func(span ptrace.Span) bool { + return span.Name() == "operationA" + }) + }, + }, } for _, tt := range tests { @@ -450,6 +487,30 @@ func Test_ProcessTraces_MixContext(t *testing.T) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") }, }, + { + name: "drop stops statement execution", + contextStatments: []common.ContextStatements{ + { + Context: "spanevent", + Statements: []string{ + `drop() where name == "spanEventA"`, + `set(name, "pass")`, + }, + }, + { + Context: "span", + Statements: []string{ + `set(attributes["test"], "pass")`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + empty := ptrace.NewSpanEventSlice() + empty.CopyTo(td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Events()) + }, + }, } for _, tt := range tests { @@ -609,6 +670,8 @@ func fillSpanTwo(span ptrace.Span) { span.Attributes().PutStr("http.path", "/health") span.Attributes().PutStr("http.url", "http://localhost/health") span.Attributes().PutStr("flags", "C|D") + spanEvent := span.Events().AppendEmpty() + spanEvent.SetName("spanEventA") link0 := span.Links().AppendEmpty() link0.SetDroppedAttributesCount(4) link1 := span.Links().AppendEmpty() From a9a3a618f259176d4b9a62903bbd2ea30c8251f9 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 16 Nov 2022 09:30:48 -0700 Subject: [PATCH 2/8] run make gotidy --- processor/transformprocessor/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index 2907e5732812..bb4449f3d1dd 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.8.1 go.opentelemetry.io/collector v0.64.2-0.20221115155901-1550938c18fd go.opentelemetry.io/collector/pdata v0.64.2-0.20221115155901-1550938c18fd + go.uber.org/multierr v1.8.0 go.uber.org/zap v1.23.0 ) @@ -33,7 +34,6 @@ require ( go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.11.1 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.8.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/sys v0.2.0 // indirect From dc5531be8b512b0d1002da62f47473f213535f6f Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 16 Nov 2022 09:52:45 -0700 Subject: [PATCH 3/8] Move datapoint handling back to funcs --- .../internal/common/metrics.go | 125 +++++++++++------- 1 file changed, 79 insertions(+), 46 deletions(-) diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 8e6b71c9ef92..3e3ec43b9f34 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -16,6 +16,7 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -76,62 +77,38 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr smetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool { switch metric.Type() { case pmetric.MetricTypeSum: - metric.Sum().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Sum().DataPoints().Len() == 0 case pmetric.MetricTypeGauge: - metric.Gauge().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Gauge().DataPoints().Len() == 0 case pmetric.MetricTypeHistogram: - metric.Histogram().DataPoints().RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Histogram().DataPoints().Len() == 0 case pmetric.MetricTypeExponentialHistogram: - metric.ExponentialHistogram().DataPoints().RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.ExponentialHistogram().DataPoints().Len() == 0 case pmetric.MetricTypeSummary: - metric.Summary().DataPoints().RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Summary().DataPoints().Len() == 0 + default: + return false } - return false }) return smetrics.Metrics().Len() == 0 }) @@ -140,6 +117,62 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr return errors } +func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + type MetricParserCollection struct { parserCollection metricParser ottl.Parser[ottlmetric.TransformContext] From 59240bdabf50389fa4f87aecf8ee059cd8ec35ab Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 16 Nov 2022 10:16:28 -0700 Subject: [PATCH 4/8] Fix lint --- processor/transformprocessor/internal/common/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 3e3ec43b9f34..ee2a2f20513c 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -16,10 +16,10 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" From 8112f52651b65aab5e5b0b956fe7028d83a9d189 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:08:49 -0700 Subject: [PATCH 5/8] Update processor/transformprocessor/README.md Co-authored-by: Evan Bradley --- processor/transformprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index a609b3fcd4d6..63bd767a73fc 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -12,7 +12,7 @@ The transform processor modifies telemetry based on configuration using the [Ope For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config. Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. -**Tables of Contents** +**Table of Contents** - [Config](#config) - [Example](#example) - [Grammar](#grammar) From 7de801ce77604fbb3298ff27f9358ffeb025e0ba Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:09:20 -0700 Subject: [PATCH 6/8] Update processor/transformprocessor/README.md Co-authored-by: Evan Bradley --- processor/transformprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 63bd767a73fc..040b86403aa0 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -179,7 +179,7 @@ In addition to OTTL functions, the processor defines its own functions to help w `drop()` -Drops the specified [Context's](#contexts) telemetry from the collector. Best used with a condition, otherwise all the Context's telemetry would be dropped. +Drops the specified [Context's](#contexts) telemetry from the pipeline. Best used with a condition, otherwise all the Context's telemetry would be dropped. If a Metric ever has an empty DataPoints slice, the metric will also be dropped. If a Scope ever has an empty Span/Metric/Log slice the scope will also be dropped. From 1581efcbf5ee272f9606e6c33d006f4b599d7a36 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:12:23 -0700 Subject: [PATCH 7/8] Add more tests --- .../internal/metrics/processor_test.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index c69097dfc0cb..14cdb959ab90 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -487,6 +487,36 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { empty.CopyTo(td.ResourceMetrics()) }, }, + { + statements: []string{ + `drop() where metric.type == METRIC_DATA_TYPE_SUM`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + return metric.Type() == pmetric.MetricTypeSum + }) + }, + }, + { + statements: []string{ + `drop() where metric.type == METRIC_DATA_TYPE_GAUGE`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + return metric.Type() == pmetric.MetricTypeGauge + }) + }, + }, + { + statements: []string{ + `drop() where metric.type == METRIC_DATA_TYPE_HISTOGRAM`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + return metric.Type() == pmetric.MetricTypeHistogram + }) + }, + }, { statements: []string{ `drop() where metric.type == METRIC_DATA_TYPE_EXPONENTIAL_HISTOGRAM`, @@ -497,6 +527,16 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { }) }, }, + { + statements: []string{ + `drop() where metric.type == METRIC_DATA_TYPE_SUMMARY`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + return metric.Type() == pmetric.MetricTypeSummary + }) + }, + }, } for _, tt := range tests { From 040358079a1f532fec05c31c5002f5abf141ebeb Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 18 Nov 2022 09:25:26 -0700 Subject: [PATCH 8/8] Add gauge data points to processor tests --- .../internal/metrics/processor_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 14cdb959ab90..29eac74782ac 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -149,6 +149,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("test", "pass") }, }, { @@ -181,6 +182,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("test") }, }, { @@ -190,6 +192,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { @@ -253,6 +256,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -265,6 +269,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -308,6 +313,11 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "test1") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr2", "test2") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr4", "test3") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("attr4", "test3") }, }, { @@ -382,6 +392,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { @@ -587,6 +598,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("test", "pass") }, }, { @@ -614,6 +626,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("test", "pass") }, }, { @@ -667,6 +680,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Gauge().DataPoints().At(0).Attributes().PutStr("test", "pass") }, }, { @@ -720,6 +734,7 @@ func constructMetrics() pmetric.Metrics { fillMetricTwo(rm0ils0.Metrics().AppendEmpty()) fillMetricThree(rm0ils0.Metrics().AppendEmpty()) fillMetricFour(rm0ils0.Metrics().AppendEmpty()) + fillMetricFive(rm0ils0.Metrics().AppendEmpty()) return td } @@ -811,3 +826,16 @@ func fillMetricFour(m pmetric.Metric) { quantileDataPoint1.SetQuantile(.95) quantileDataPoint1.SetValue(321) } + +func fillMetricFive(m pmetric.Metric) { + m.SetName("operationE") + m.SetDescription("operationE description") + m.SetUnit("operationE unit") + + dataPoint0 := m.SetEmptyGauge().DataPoints().AppendEmpty() + dataPoint0.SetStartTimestamp(StartTimestamp) + dataPoint0.SetDoubleValue(1.0) + dataPoint0.Attributes().PutStr("attr1", "test1") + dataPoint0.Attributes().PutStr("attr2", "test2") + dataPoint0.Attributes().PutStr("attr3", "test3") +}