From 0b6b8715cae82da6c0a08e1726403ad24bbdf4de Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 2 Aug 2024 11:41:55 +0200 Subject: [PATCH 1/9] [receiver/cumulativetodelta] add filter for metric type Signed-off-by: Florian Bacher --- .../cumulativetodeltaprocessor/config.go | 38 ++++++++++++++++ .../cumulativetodeltaprocessor/config_test.go | 40 +++++++++++++++++ .../cumulativetodeltaprocessor/processor.go | 31 +++++++------ .../testdata/config.yaml | 43 +++++++++++++++++++ 4 files changed, 140 insertions(+), 12 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index dcba656c838d..8b4439e14c98 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -5,6 +5,9 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "fmt" + "go.opentelemetry.io/collector/pdata/pmetric" + "slices" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -13,6 +16,20 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) +var validExcludeMetricTypes = []string{ + pmetric.MetricTypeSum.String(), + pmetric.MetricTypeExponentialHistogram.String(), + pmetric.MetricTypeHistogram.String(), + pmetric.MetricTypeEmpty.String(), + pmetric.MetricTypeSummary.String(), + pmetric.MetricTypeGauge.String(), +} + +var validIncludeMetricTypes = []string{ + pmetric.MetricTypeSum.String(), + pmetric.MetricTypeHistogram.String(), +} + // Config defines the configuration for the processor. type Config struct { // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. @@ -37,6 +54,8 @@ type MatchMetrics struct { filterset.Config `mapstructure:",squash"` Metrics []string `mapstructure:"metrics"` + + MetricTypes []string `mapstructure:"metric_types"` } var _ component.Config = (*Config)(nil) @@ -52,5 +71,24 @@ func (config *Config) Validate() error { (len(config.Exclude.MatchType) > 0 && len(config.Exclude.Metrics) == 0) { return fmt.Errorf("metrics must be supplied if match_type is set") } + + for _, metricType := range config.Exclude.MetricTypes { + if !slices.Contains(validExcludeMetricTypes, metricType) { + return fmt.Errorf( + "found invalid metric type in exclude.metric_types: %s. Valid values are [%s]", + metricType, + strings.Join(validExcludeMetricTypes, ","), + ) + } + } + for _, metricType := range config.Include.MetricTypes { + if !slices.Contains(validIncludeMetricTypes, metricType) { + return fmt.Errorf( + "found invalid metric type in include.metric_types: %s. Valid values are [%s]", + metricType, + strings.Join(validIncludeMetricTypes, ","), + ) + } + } return nil } diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index 97c3f8952077..5c7387e615a0 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -4,6 +4,7 @@ package cumulativetodeltaprocessor import ( + "go.opentelemetry.io/collector/pdata/pmetric" "path/filepath" "testing" "time" @@ -82,6 +83,45 @@ func TestLoadConfig(t *testing.T) { InitialValue: tracking.InitialValueAuto, }, }, + { + id: component.NewIDWithName(metadata.Type, "metric_type_filter"), + expected: &Config{ + Include: MatchMetrics{ + Metrics: []string{ + "a*", + }, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + MetricTypes: []string{ + pmetric.MetricTypeSum.String(), + }, + }, + Exclude: MatchMetrics{ + Metrics: []string{ + "b*", + }, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + MetricTypes: []string{ + pmetric.MetricTypeHistogram.String(), + }, + }, + MaxStaleness: 10 * time.Second, + InitialValue: tracking.InitialValueAuto, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"), + errorMessage: "found invalid metric type in include.metric_types: Gauge. Valid values are [Sum,Histogram]", + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"), + errorMessage: "found invalid metric type in exclude.metric_types: Invalid. Valid values are [Sum,ExponentialHistogram,Histogram,Empty,Summary,Gauge]", + }, { id: component.NewIDWithName(metadata.Type, "missing_match_type"), errorMessage: "match_type must be set if metrics are supplied", diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 83130ea97ee4..37aac488bf80 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -6,6 +6,7 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" "math" + "slices" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -15,19 +16,23 @@ import ( ) type cumulativeToDeltaProcessor struct { - includeFS filterset.FilterSet - excludeFS filterset.FilterSet - logger *zap.Logger - deltaCalculator *tracking.MetricTracker - cancelFunc context.CancelFunc + includeFS filterset.FilterSet + excludeFS filterset.FilterSet + includeMetricTypes []string + excludeMetricTypes []string + logger *zap.Logger + deltaCalculator *tracking.MetricTracker + cancelFunc context.CancelFunc } func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { ctx, cancel := context.WithCancel(context.Background()) p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), - cancelFunc: cancel, + logger: logger, + deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), + cancelFunc: cancel, + includeMetricTypes: config.Include.MetricTypes, + excludeMetricTypes: config.Exclude.MetricTypes, } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -43,7 +48,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { rm.ScopeMetrics().RemoveIf(func(ilm pmetric.ScopeMetrics) bool { ilm.Metrics().RemoveIf(func(m pmetric.Metric) bool { - if !ctdp.shouldConvertMetric(m.Name()) { + if !ctdp.shouldConvertMetric(m) { return false } switch m.Type() { @@ -111,9 +116,11 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { return nil } -func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) bool { - return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metricName)) && - (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) +func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool { + return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) && + (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) && + (ctdp.includeMetricTypes == nil || slices.Contains(ctdp.includeMetricTypes, metric.Type().String())) && + (ctdp.excludeMetricTypes == nil || !slices.Contains(ctdp.excludeMetricTypes, metric.Type().String())) } func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in any, baseIdentity tracking.MetricIdentity) { diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index 31775d239adb..bbdf062580bc 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -42,6 +42,49 @@ cumulativetodelta/regexp: - b* max_staleness: 10s +cumulativetodelta/metric_type_filter: + include: + match_type: regexp + metrics: + - a* + metric_types: + - Sum + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - Histogram + max_staleness: 10s + +cumulativetodelta/invalid_include_metric_type_filter: + include: + match_type: regexp + metrics: + - a* + metric_types: + - Gauge + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - Histogram + max_staleness: 10s + +cumulativetodelta/invalid_exclude_metric_type_filter: + include: + match_type: regexp + metrics: + - a* + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - Invalid + max_staleness: 10s + cumulativetodelta/auto: initial_value: auto From 9a4764b004435ee93df4bddd46b4c2739b9f7e22 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 5 Aug 2024 09:46:51 +0200 Subject: [PATCH 2/9] add unit tests Signed-off-by: Florian Bacher --- .../cumulativetodeltaprocessor/config.go | 19 +- .../processor_test.go | 236 +++++++++++++----- 2 files changed, 185 insertions(+), 70 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 8b4439e14c98..b35883326bcb 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -16,16 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) -var validExcludeMetricTypes = []string{ - pmetric.MetricTypeSum.String(), - pmetric.MetricTypeExponentialHistogram.String(), - pmetric.MetricTypeHistogram.String(), - pmetric.MetricTypeEmpty.String(), - pmetric.MetricTypeSummary.String(), - pmetric.MetricTypeGauge.String(), -} - -var validIncludeMetricTypes = []string{ +var validMetricTypes = []string{ pmetric.MetricTypeSum.String(), pmetric.MetricTypeHistogram.String(), } @@ -73,20 +64,20 @@ func (config *Config) Validate() error { } for _, metricType := range config.Exclude.MetricTypes { - if !slices.Contains(validExcludeMetricTypes, metricType) { + if !slices.Contains(validMetricTypes, metricType) { return fmt.Errorf( "found invalid metric type in exclude.metric_types: %s. Valid values are [%s]", metricType, - strings.Join(validExcludeMetricTypes, ","), + strings.Join(validMetricTypes, ","), ) } } for _, metricType := range config.Include.MetricTypes { - if !slices.Contains(validIncludeMetricTypes, metricType) { + if !slices.Contains(validMetricTypes, metricType) { return fmt.Errorf( "found invalid metric type in include.metric_types: %s. Valid values are [%s]", metricType, - strings.Join(validIncludeMetricTypes, ","), + strings.Join(validMetricTypes, ","), ) } } diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index 53441c254cdf..3ae61a36cb18 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -35,6 +35,30 @@ type testSumMetric struct { flags [][]pmetric.DataPointFlags } +func (tm testSumMetric) addToMetrics(ms pmetric.MetricSlice, now time.Time) { + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + sum := m.SetEmptySum() + sum.SetIsMonotonic(tm.isMonotonic[i]) + + if tm.isCumulative[i] { + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + + for index, value := range tm.metricValues[i] { + dp := m.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetDoubleValue(value) + if len(tm.flags) > i && len(tm.flags[i]) > index { + dp.SetFlags(tm.flags[i][index]) + } + } + } +} + type testHistogramMetric struct { metricNames []string metricCounts [][]uint64 @@ -46,6 +70,47 @@ type testHistogramMetric struct { flags [][]pmetric.DataPointFlags } +func (tm testHistogramMetric) addToMetrics(ms pmetric.MetricSlice, now time.Time) { + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + hist := m.SetEmptyHistogram() + + if tm.isCumulative[i] { + hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + + for index, count := range tm.metricCounts[i] { + dp := m.Histogram().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetCount(count) + + sums := tm.metricSums[i] + if len(sums) > 0 { + dp.SetSum(sums[index]) + } + if tm.metricMins != nil { + mins := tm.metricMins[i] + if len(mins) > 0 { + dp.SetMin(mins[index]) + } + } + if tm.metricMaxes != nil { + maxes := tm.metricMaxes[i] + if len(maxes) > 0 { + dp.SetMax(maxes[index]) + } + } + dp.BucketCounts().FromRaw(tm.metricBuckets[i][index]) + if len(tm.flags) > i && len(tm.flags[i]) > index { + dp.SetFlags(tm.flags[i][index]) + } + } + } +} + type cumulativeToDeltaTest struct { name string include MatchMetrics @@ -436,6 +501,109 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { isMonotonic: []bool{true}, }), }, + { + name: "cumulative_to_delta_exclude_sum_metrics", + include: MatchMetrics{}, + exclude: MatchMetrics{ + MetricTypes: []string{"Sum"}, + }, + inMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true, true}, + isMonotonic: []bool{true, true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{0, 100, 200, 500}}, + metricSums: [][]float64{{0, 100, 200, 500}}, + metricBuckets: [][][]uint64{ + {{0, 0, 0}, {50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + }, + metricMins: [][]float64{ + {0, 5.0, 2.0, 3.0}, + }, + metricMaxes: [][]float64{ + {0, 800.0, 825.0, 800.0}, + }, + isCumulative: []bool{true}, + }, + ), + outMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true}, + isMonotonic: []bool{true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}}, + metricSums: [][]float64{{100, 100, 300}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + }, + metricMins: [][]float64{ + nil, + }, + metricMaxes: [][]float64{ + nil, + }, + isCumulative: []bool{false}, + }), + }, + { + name: "cumulative_to_delta_include_histogram_metrics", + include: MatchMetrics{ + MetricTypes: []string{"Histogram"}, + }, + inMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true, true}, + isMonotonic: []bool{true, true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{0, 100, 200, 500}}, + metricSums: [][]float64{{0, 100, 200, 500}}, + metricBuckets: [][][]uint64{ + {{0, 0, 0}, {50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + }, + metricMins: [][]float64{ + {0, 5.0, 2.0, 3.0}, + }, + metricMaxes: [][]float64{ + {0, 800.0, 825.0, 800.0}, + }, + isCumulative: []bool{true}, + }, + ), + outMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true}, + isMonotonic: []bool{true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}}, + metricSums: [][]float64{{100, 100, 300}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + }, + metricMins: [][]float64{ + nil, + }, + metricMaxes: [][]float64{ + nil, + }, + isCumulative: []bool{false}, + }), + }, } for _, test := range testCases { @@ -540,27 +708,7 @@ func generateTestSumMetrics(tm testSumMetric) pmetric.Metrics { rm := md.ResourceMetrics().AppendEmpty() ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for i, name := range tm.metricNames { - m := ms.AppendEmpty() - m.SetName(name) - sum := m.SetEmptySum() - sum.SetIsMonotonic(tm.isMonotonic[i]) - - if tm.isCumulative[i] { - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } else { - sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - } - - for index, value := range tm.metricValues[i] { - dp := m.Sum().DataPoints().AppendEmpty() - dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) - dp.SetDoubleValue(value) - if len(tm.flags) > i && len(tm.flags[i]) > index { - dp.SetFlags(tm.flags[i][index]) - } - } - } + tm.addToMetrics(ms, now) return md } @@ -571,44 +719,20 @@ func generateTestHistogramMetrics(tm testHistogramMetric) pmetric.Metrics { rm := md.ResourceMetrics().AppendEmpty() ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for i, name := range tm.metricNames { - m := ms.AppendEmpty() - m.SetName(name) - hist := m.SetEmptyHistogram() + tm.addToMetrics(ms, now) - if tm.isCumulative[i] { - hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } else { - hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - } + return md +} - for index, count := range tm.metricCounts[i] { - dp := m.Histogram().DataPoints().AppendEmpty() - dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) - dp.SetCount(count) +func generateMixedTestMetrics(tsm testSumMetric, thm testHistogramMetric) pmetric.Metrics { + md := pmetric.NewMetrics() + now := time.Now() - sums := tm.metricSums[i] - if len(sums) > 0 { - dp.SetSum(sums[index]) - } - if tm.metricMins != nil { - mins := tm.metricMins[i] - if len(mins) > 0 { - dp.SetMin(mins[index]) - } - } - if tm.metricMaxes != nil { - maxes := tm.metricMaxes[i] - if len(maxes) > 0 { - dp.SetMax(maxes[index]) - } - } - dp.BucketCounts().FromRaw(tm.metricBuckets[i][index]) - if len(tm.flags) > i && len(tm.flags[i]) > index { - dp.SetFlags(tm.flags[i][index]) - } - } - } + rm := md.ResourceMetrics().AppendEmpty() + ms := rm.ScopeMetrics().AppendEmpty().Metrics() + + tsm.addToMetrics(ms, now) + thm.addToMetrics(ms, now) return md } From ed6f12f398abaecc97873a3e951f6aa113d4cee9 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 5 Aug 2024 11:09:17 +0200 Subject: [PATCH 3/9] add docs, changelog entry and additional tests Signed-off-by: Florian Bacher --- ...to-delta-processor-metric-type-filter.yaml | 27 +++++++++++ .../cumulativetodeltaprocessor/README.md | 46 ++++++++++++++++++- .../cumulativetodeltaprocessor/config.go | 14 +++--- .../cumulativetodeltaprocessor/config_test.go | 9 ++-- .../cumulativetodeltaprocessor/processor.go | 5 +- .../processor_test.go | 4 +- .../testdata/config.yaml | 8 ++-- 7 files changed, 92 insertions(+), 21 deletions(-) create mode 100644 .chloggen/cumulative-to-delta-processor-metric-type-filter.yaml diff --git a/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml b/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml new file mode 100644 index 000000000000..79f1c9d82175 --- /dev/null +++ b/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cumulativetodeltaprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add metric type filter for cumulativetodelta processor + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33673] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 4079e00561f8..9588b92077da 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -22,8 +22,8 @@ Configuration is specified through a list of metrics. The processor uses metric The following settings can be optionally configured: -- `include`: List of metrics names or patterns to convert to delta. -- `exclude`: List of metrics names or patterns to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** +- `include`: List of metrics names, patterns or metric types to convert to delta. +- `exclude`: List of metrics names, patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** - `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 - `initial_value`: Handling of the first observed point for a given metric identity. When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values. @@ -55,6 +55,17 @@ processors: match_type: strict ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert all sum metrics + include: + metric_types: + - sum +``` + ```yaml processors: # processor name: cumulativetodelta @@ -68,6 +79,21 @@ processors: match_type: regexp ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert cumulative sum metrics to delta + # if and only if 'metric' is in the name + include: + metrics: + - ".*metric.*" + match_type: regexp + metric_types: + - sum +``` + ```yaml processors: # processor name: cumulativetodelta @@ -81,6 +107,22 @@ processors: match_type: regexp ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert cumulative sum metrics with 'metric' in their name, + # but exclude histogram metrics + include: + metrics: + - ".*metric.*" + match_type: regexp + exclude: + metric_types: + - histogram +``` + ```yaml processors: # processor name: cumulativetodelta diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index b35883326bcb..57616204a3d6 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -17,8 +17,8 @@ import ( ) var validMetricTypes = []string{ - pmetric.MetricTypeSum.String(), - pmetric.MetricTypeHistogram.String(), + strings.ToLower(pmetric.MetricTypeSum.String()), + strings.ToLower(pmetric.MetricTypeHistogram.String()), } // Config defines the configuration for the processor. @@ -63,8 +63,9 @@ func (config *Config) Validate() error { return fmt.Errorf("metrics must be supplied if match_type is set") } - for _, metricType := range config.Exclude.MetricTypes { - if !slices.Contains(validMetricTypes, metricType) { + for i, metricType := range config.Exclude.MetricTypes { + config.Exclude.MetricTypes[i] = strings.ToLower(metricType) + if !slices.Contains(validMetricTypes, config.Exclude.MetricTypes[i]) { return fmt.Errorf( "found invalid metric type in exclude.metric_types: %s. Valid values are [%s]", metricType, @@ -72,8 +73,9 @@ func (config *Config) Validate() error { ) } } - for _, metricType := range config.Include.MetricTypes { - if !slices.Contains(validMetricTypes, metricType) { + for i, metricType := range config.Include.MetricTypes { + config.Include.MetricTypes[i] = strings.ToLower(metricType) + if !slices.Contains(validMetricTypes, config.Include.MetricTypes[i]) { return fmt.Errorf( "found invalid metric type in include.metric_types: %s. Valid values are [%s]", metricType, diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index 5c7387e615a0..b77178ff0ff6 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -4,7 +4,6 @@ package cumulativetodeltaprocessor import ( - "go.opentelemetry.io/collector/pdata/pmetric" "path/filepath" "testing" "time" @@ -95,7 +94,7 @@ func TestLoadConfig(t *testing.T) { RegexpConfig: nil, }, MetricTypes: []string{ - pmetric.MetricTypeSum.String(), + "sum", }, }, Exclude: MatchMetrics{ @@ -107,7 +106,7 @@ func TestLoadConfig(t *testing.T) { RegexpConfig: nil, }, MetricTypes: []string{ - pmetric.MetricTypeHistogram.String(), + "histogram", }, }, MaxStaleness: 10 * time.Second, @@ -116,11 +115,11 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"), - errorMessage: "found invalid metric type in include.metric_types: Gauge. Valid values are [Sum,Histogram]", + errorMessage: "found invalid metric type in include.metric_types: gauge. Valid values are [sum,histogram]", }, { id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"), - errorMessage: "found invalid metric type in exclude.metric_types: Invalid. Valid values are [Sum,ExponentialHistogram,Histogram,Empty,Summary,Gauge]", + errorMessage: "found invalid metric type in exclude.metric_types: Invalid. Valid values are [sum,histogram]", }, { id: component.NewIDWithName(metadata.Type, "missing_match_type"), diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 37aac488bf80..e4ef41da99ab 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -7,6 +7,7 @@ import ( "context" "math" "slices" + "strings" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -118,9 +119,9 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool { return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) && + (len(ctdp.includeMetricTypes) == 0 || slices.Contains(ctdp.includeMetricTypes, strings.ToLower(metric.Type().String()))) && (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) && - (ctdp.includeMetricTypes == nil || slices.Contains(ctdp.includeMetricTypes, metric.Type().String())) && - (ctdp.excludeMetricTypes == nil || !slices.Contains(ctdp.excludeMetricTypes, metric.Type().String())) + (len(ctdp.excludeMetricTypes) == 0 || !slices.Contains(ctdp.excludeMetricTypes, strings.ToLower(metric.Type().String()))) } func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in any, baseIdentity tracking.MetricIdentity) { diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index 3ae61a36cb18..e960ec095541 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -505,7 +505,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { name: "cumulative_to_delta_exclude_sum_metrics", include: MatchMetrics{}, exclude: MatchMetrics{ - MetricTypes: []string{"Sum"}, + MetricTypes: []string{"sum"}, }, inMetrics: generateMixedTestMetrics( testSumMetric{ @@ -556,7 +556,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { { name: "cumulative_to_delta_include_histogram_metrics", include: MatchMetrics{ - MetricTypes: []string{"Histogram"}, + MetricTypes: []string{"histogram"}, }, inMetrics: generateMixedTestMetrics( testSumMetric{ diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index bbdf062580bc..07945488d5c5 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -48,13 +48,13 @@ cumulativetodelta/metric_type_filter: metrics: - a* metric_types: - - Sum + - sum exclude: match_type: regexp metrics: - b* metric_types: - - Histogram + - histogram max_staleness: 10s cumulativetodelta/invalid_include_metric_type_filter: @@ -63,13 +63,13 @@ cumulativetodelta/invalid_include_metric_type_filter: metrics: - a* metric_types: - - Gauge + - gauge exclude: match_type: regexp metrics: - b* metric_types: - - Histogram + - histogram max_staleness: 10s cumulativetodelta/invalid_exclude_metric_type_filter: From 3921fa40794f3c20a41f89418f78ed8c54a4ca56 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 5 Aug 2024 14:11:30 +0200 Subject: [PATCH 4/9] fix linting Signed-off-by: Florian Bacher --- processor/cumulativetodeltaprocessor/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 57616204a3d6..8ef5d4504ed4 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -5,11 +5,12 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "fmt" - "go.opentelemetry.io/collector/pdata/pmetric" "slices" "strings" "time" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/component" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" From 55985d88a78eb07024309db68e77c8fee0988f71 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 5 Aug 2024 14:19:22 +0200 Subject: [PATCH 5/9] fix linting Signed-off-by: Florian Bacher --- processor/cumulativetodeltaprocessor/config.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 8ef5d4504ed4..88dc76b4834f 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -9,9 +9,8 @@ import ( "strings" "time" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" From 172d902413a0e11daa41e324522d48f21b4e1fb0 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 4 Nov 2024 12:41:00 +0100 Subject: [PATCH 6/9] incorporate feedback from PR review Signed-off-by: Florian Bacher --- .../cumulativetodeltaprocessor/README.md | 4 +-- .../cumulativetodeltaprocessor/config.go | 24 +++++++------- processor/cumulativetodeltaprocessor/go.mod | 1 + processor/cumulativetodeltaprocessor/go.sum | 2 ++ .../cumulativetodeltaprocessor/processor.go | 32 +++++++++++++------ 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 826a95ff1c84..20f36983d8e8 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -23,8 +23,8 @@ Configuration is specified through a list of metrics. The processor uses metric The following settings can be optionally configured: -- `include`: List of metrics names, patterns or metric types to convert to delta. -- `exclude`: List of metrics names, patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** +- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`. +- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`. - `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 - `initial_value`: Handling of the first observed point for a given metric identity. When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values. diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 88dc76b4834f..ed6f39f257f6 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -5,7 +5,7 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "fmt" - "slices" + "golang.org/x/exp/maps" "strings" "time" @@ -16,11 +16,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) -var validMetricTypes = []string{ - strings.ToLower(pmetric.MetricTypeSum.String()), - strings.ToLower(pmetric.MetricTypeHistogram.String()), +var validMetricTypes = map[string]bool{ + strings.ToLower(pmetric.MetricTypeSum.String()): true, + strings.ToLower(pmetric.MetricTypeHistogram.String()): true, } +var validMetricTypeList = maps.Keys(validMetricTypes) + // Config defines the configuration for the processor. type Config struct { // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. @@ -63,23 +65,21 @@ func (config *Config) Validate() error { return fmt.Errorf("metrics must be supplied if match_type is set") } - for i, metricType := range config.Exclude.MetricTypes { - config.Exclude.MetricTypes[i] = strings.ToLower(metricType) - if !slices.Contains(validMetricTypes, config.Exclude.MetricTypes[i]) { + for _, metricType := range config.Exclude.MetricTypes { + if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { return fmt.Errorf( "found invalid metric type in exclude.metric_types: %s. Valid values are [%s]", metricType, - strings.Join(validMetricTypes, ","), + strings.Join(validMetricTypeList, ","), ) } } - for i, metricType := range config.Include.MetricTypes { - config.Include.MetricTypes[i] = strings.ToLower(metricType) - if !slices.Contains(validMetricTypes, config.Include.MetricTypes[i]) { + for _, metricType := range config.Include.MetricTypes { + if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { return fmt.Errorf( "found invalid metric type in include.metric_types: %s. Valid values are [%s]", metricType, - strings.Join(validMetricTypes, ","), + strings.Join(validMetricTypeList, ","), ) } } diff --git a/processor/cumulativetodeltaprocessor/go.mod b/processor/cumulativetodeltaprocessor/go.mod index d4f9b386a093..74bb448cf416 100644 --- a/processor/cumulativetodeltaprocessor/go.mod +++ b/processor/cumulativetodeltaprocessor/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/processor/processortest v0.112.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 ) require ( diff --git a/processor/cumulativetodeltaprocessor/go.sum b/processor/cumulativetodeltaprocessor/go.sum index c541e4c42b6c..74884ad65d31 100644 --- a/processor/cumulativetodeltaprocessor/go.sum +++ b/processor/cumulativetodeltaprocessor/go.sum @@ -99,6 +99,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 0e4150d03339..37ec0ae80675 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -5,12 +5,10 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" - "math" - "slices" - "strings" - "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" + "math" + "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" @@ -19,8 +17,8 @@ import ( type cumulativeToDeltaProcessor struct { includeFS filterset.FilterSet excludeFS filterset.FilterSet - includeMetricTypes []string - excludeMetricTypes []string + includeMetricTypes map[pmetric.MetricType]bool + excludeMetricTypes map[pmetric.MetricType]bool logger *zap.Logger deltaCalculator *tracking.MetricTracker cancelFunc context.CancelFunc @@ -32,8 +30,8 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati logger: logger, deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), cancelFunc: cancel, - includeMetricTypes: config.Include.MetricTypes, - excludeMetricTypes: config.Exclude.MetricTypes, + includeMetricTypes: getMetricTypeFilter(config.Include.MetricTypes), + excludeMetricTypes: getMetricTypeFilter(config.Exclude.MetricTypes), } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -44,6 +42,20 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati return p } +func getMetricTypeFilter(types []string) map[pmetric.MetricType]bool { + res := map[pmetric.MetricType]bool{} + for _, t := range types { + switch strings.ToLower(t) { + case strings.ToLower(pmetric.MetricTypeSum.String()): + res[pmetric.MetricTypeSum] = true + case strings.ToLower(pmetric.MetricTypeHistogram.String()): + res[pmetric.MetricTypeHistogram] = true + default: + } + } + return res +} + // processMetrics implements the ProcessMetricsFunc type. func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { @@ -119,9 +131,9 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool { return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) && - (len(ctdp.includeMetricTypes) == 0 || slices.Contains(ctdp.includeMetricTypes, strings.ToLower(metric.Type().String()))) && + (len(ctdp.includeMetricTypes) == 0 || ctdp.includeMetricTypes[metric.Type()]) && (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) && - (len(ctdp.excludeMetricTypes) == 0 || !slices.Contains(ctdp.excludeMetricTypes, strings.ToLower(metric.Type().String()))) + (len(ctdp.excludeMetricTypes) == 0 || !ctdp.excludeMetricTypes[metric.Type()]) } func (ctdp *cumulativeToDeltaProcessor) convertNumberDataPoints(dps pmetric.NumberDataPointSlice, baseIdentity tracking.MetricIdentity) { From 0554bbdcad6eda135a28d08c6195b777c28631b4 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 4 Nov 2024 13:42:29 +0100 Subject: [PATCH 7/9] fix linting Signed-off-by: Florian Bacher --- processor/cumulativetodeltaprocessor/config.go | 2 +- processor/cumulativetodeltaprocessor/processor.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index ed6f39f257f6..91e412ec895d 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -5,12 +5,12 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "fmt" - "golang.org/x/exp/maps" "strings" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" + "golang.org/x/exp/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 37ec0ae80675..f81c61928af1 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -5,11 +5,12 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" "math" "strings" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) From 6ff04939a1ca2aef9e5858d5588d5ffec0a0454d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 4 Nov 2024 14:01:40 +0100 Subject: [PATCH 8/9] fix unit test Signed-off-by: Florian Bacher --- processor/cumulativetodeltaprocessor/config.go | 8 ++++---- processor/cumulativetodeltaprocessor/config_test.go | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 91e412ec895d..adcc81090f2d 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -68,18 +68,18 @@ func (config *Config) Validate() error { for _, metricType := range config.Exclude.MetricTypes { if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { return fmt.Errorf( - "found invalid metric type in exclude.metric_types: %s. Valid values are [%s]", + "found invalid metric type in exclude.metric_types: %s. Valid values are %s", metricType, - strings.Join(validMetricTypeList, ","), + validMetricTypeList, ) } } for _, metricType := range config.Include.MetricTypes { if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { return fmt.Errorf( - "found invalid metric type in include.metric_types: %s. Valid values are [%s]", + "found invalid metric type in include.metric_types: %s. Valid values are %s", metricType, - strings.Join(validMetricTypeList, ","), + validMetricTypeList, ) } } diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index b77178ff0ff6..337c2fc13c37 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -4,6 +4,7 @@ package cumulativetodeltaprocessor import ( + "fmt" "path/filepath" "testing" "time" @@ -115,11 +116,11 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"), - errorMessage: "found invalid metric type in include.metric_types: gauge. Valid values are [sum,histogram]", + errorMessage: fmt.Sprintf("found invalid metric type in include.metric_types: gauge. Valid values are %s", validMetricTypeList), }, { id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"), - errorMessage: "found invalid metric type in exclude.metric_types: Invalid. Valid values are [sum,histogram]", + errorMessage: fmt.Sprintf("found invalid metric type in exclude.metric_types: Invalid. Valid values are %s", validMetricTypeList), }, { id: component.NewIDWithName(metadata.Type, "missing_match_type"), From 74f09ae063bf401c836ca818a3bd9da03a4efcd4 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 17 Dec 2024 08:28:44 +0100 Subject: [PATCH 9/9] return error if metric type filter contains an unsupported type Signed-off-by: Florian Bacher --- .../cumulativetodeltaprocessor/factory.go | 5 ++- .../factory_test.go | 7 ++++ .../cumulativetodeltaprocessor/processor.go | 37 ++++++++++++++----- .../processor_test.go | 22 +++++++++++ 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index 24ffc9c3e334..21a0af4a04b5 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -40,7 +40,10 @@ func createMetricsProcessor( return nil, fmt.Errorf("configuration parsing error") } - metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + metricsProcessor, err := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + if err != nil { + return nil, err + } return processorhelper.NewMetrics( ctx, diff --git a/processor/cumulativetodeltaprocessor/factory_test.go b/processor/cumulativetodeltaprocessor/factory_test.go index 6926b4257d6d..b309bc430396 100644 --- a/processor/cumulativetodeltaprocessor/factory_test.go +++ b/processor/cumulativetodeltaprocessor/factory_test.go @@ -6,6 +6,7 @@ package cumulativetodeltaprocessor import ( "context" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -61,6 +62,12 @@ func TestCreateProcessors(t *testing.T) { processortest.NewNopSettings(), cfg, consumertest.NewNop()) + + if strings.Contains(k, "invalid") { + assert.Error(t, mErr) + assert.Nil(t, mp) + return + } assert.NotNil(t, mp) assert.NoError(t, mErr) assert.NoError(t, mp.Shutdown(context.Background())) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index f81c61928af1..78bfbaf3fd1c 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -5,6 +5,7 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" + "fmt" "math" "strings" @@ -25,14 +26,12 @@ type cumulativeToDeltaProcessor struct { cancelFunc context.CancelFunc } -func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { +func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) (*cumulativeToDeltaProcessor, error) { ctx, cancel := context.WithCancel(context.Background()) + p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), - cancelFunc: cancel, - includeMetricTypes: getMetricTypeFilter(config.Include.MetricTypes), - excludeMetricTypes: getMetricTypeFilter(config.Exclude.MetricTypes), + logger: logger, + cancelFunc: cancel, } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -40,10 +39,29 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati if len(config.Exclude.Metrics) > 0 { p.excludeFS, _ = filterset.CreateFilterSet(config.Exclude.Metrics, &config.Exclude.Config) } - return p + + if len(config.Include.MetricTypes) > 0 { + includeMetricTypeFilter, err := getMetricTypeFilter(config.Include.MetricTypes) + if err != nil { + return nil, err + } + p.includeMetricTypes = includeMetricTypeFilter + } + + if len(config.Exclude.MetricTypes) > 0 { + excludeMetricTypeFilter, err := getMetricTypeFilter(config.Exclude.MetricTypes) + if err != nil { + return nil, err + } + p.excludeMetricTypes = excludeMetricTypeFilter + } + + p.deltaCalculator = tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue) + + return p, nil } -func getMetricTypeFilter(types []string) map[pmetric.MetricType]bool { +func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) { res := map[pmetric.MetricType]bool{} for _, t := range types { switch strings.ToLower(t) { @@ -52,9 +70,10 @@ func getMetricTypeFilter(types []string) map[pmetric.MetricType]bool { case strings.ToLower(pmetric.MetricTypeHistogram.String()): res[pmetric.MetricTypeHistogram] = true default: + return nil, fmt.Errorf("unsupported metric type filter: %s", t) } } - return res + return res, nil } // processMetrics implements the ProcessMetricsFunc type. diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index d515753c8de5..c36a8f06cb0b 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -5,6 +5,7 @@ package cumulativetodeltaprocessor import ( "context" + "errors" "math" "testing" "time" @@ -117,6 +118,7 @@ type cumulativeToDeltaTest struct { exclude MatchMetrics inMetrics pmetric.Metrics outMetrics pmetric.Metrics + wantError error } func TestCumulativeToDeltaProcessor(t *testing.T) { @@ -604,6 +606,20 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { isCumulative: []bool{false}, }), }, + { + name: "cumulative_to_delta_unsupported_include_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, + { + name: "cumulative_to_delta_unsupported_exclude_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, } for _, test := range testCases { @@ -621,6 +637,12 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { cfg, next, ) + + if test.wantError != nil { + require.ErrorContains(t, err, test.wantError.Error()) + require.Nil(t, mgp) + return + } assert.NotNil(t, mgp) assert.NoError(t, err)