diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index 24ffc9c3e334..21a0af4a04b5 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -40,7 +40,10 @@ func createMetricsProcessor( return nil, fmt.Errorf("configuration parsing error") } - metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + metricsProcessor, err := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + if err != nil { + return nil, err + } return processorhelper.NewMetrics( ctx, diff --git a/processor/cumulativetodeltaprocessor/factory_test.go b/processor/cumulativetodeltaprocessor/factory_test.go index 6926b4257d6d..b309bc430396 100644 --- a/processor/cumulativetodeltaprocessor/factory_test.go +++ b/processor/cumulativetodeltaprocessor/factory_test.go @@ -6,6 +6,7 @@ package cumulativetodeltaprocessor import ( "context" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -61,6 +62,12 @@ func TestCreateProcessors(t *testing.T) { processortest.NewNopSettings(), cfg, consumertest.NewNop()) + + if strings.Contains(k, "invalid") { + assert.Error(t, mErr) + assert.Nil(t, mp) + return + } assert.NotNil(t, mp) assert.NoError(t, mErr) assert.NoError(t, mp.Shutdown(context.Background())) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index f81c61928af1..78bfbaf3fd1c 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -5,6 +5,7 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" + "fmt" "math" "strings" @@ -25,14 +26,12 @@ type cumulativeToDeltaProcessor struct { cancelFunc context.CancelFunc } -func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { +func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) (*cumulativeToDeltaProcessor, error) { ctx, cancel := context.WithCancel(context.Background()) + p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), - cancelFunc: cancel, - includeMetricTypes: getMetricTypeFilter(config.Include.MetricTypes), - excludeMetricTypes: getMetricTypeFilter(config.Exclude.MetricTypes), + logger: logger, + cancelFunc: cancel, } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -40,10 +39,29 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati if len(config.Exclude.Metrics) > 0 { p.excludeFS, _ = filterset.CreateFilterSet(config.Exclude.Metrics, &config.Exclude.Config) } - return p + + if len(config.Include.MetricTypes) > 0 { + includeMetricTypeFilter, err := getMetricTypeFilter(config.Include.MetricTypes) + if err != nil { + return nil, err + } + p.includeMetricTypes = includeMetricTypeFilter + } + + if len(config.Exclude.MetricTypes) > 0 { + excludeMetricTypeFilter, err := getMetricTypeFilter(config.Exclude.MetricTypes) + if err != nil { + return nil, err + } + p.excludeMetricTypes = excludeMetricTypeFilter + } + + p.deltaCalculator = tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue) + + return p, nil } -func getMetricTypeFilter(types []string) map[pmetric.MetricType]bool { +func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) { res := map[pmetric.MetricType]bool{} for _, t := range types { switch strings.ToLower(t) { @@ -52,9 +70,10 @@ func getMetricTypeFilter(types []string) map[pmetric.MetricType]bool { case strings.ToLower(pmetric.MetricTypeHistogram.String()): res[pmetric.MetricTypeHistogram] = true default: + return nil, fmt.Errorf("unsupported metric type filter: %s", t) } } - return res + return res, nil } // processMetrics implements the ProcessMetricsFunc type. diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index d515753c8de5..c36a8f06cb0b 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -5,6 +5,7 @@ package cumulativetodeltaprocessor import ( "context" + "errors" "math" "testing" "time" @@ -117,6 +118,7 @@ type cumulativeToDeltaTest struct { exclude MatchMetrics inMetrics pmetric.Metrics outMetrics pmetric.Metrics + wantError error } func TestCumulativeToDeltaProcessor(t *testing.T) { @@ -604,6 +606,20 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { isCumulative: []bool{false}, }), }, + { + name: "cumulative_to_delta_unsupported_include_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, + { + name: "cumulative_to_delta_unsupported_exclude_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, } for _, test := range testCases { @@ -621,6 +637,12 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { cfg, next, ) + + if test.wantError != nil { + require.ErrorContains(t, err, test.wantError.Error()) + require.Nil(t, mgp) + return + } assert.NotNil(t, mgp) assert.NoError(t, err)