From de3ef01bff4f1e4db0eee94abce862c66582f208 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 10 Apr 2024 03:35:20 -0700 Subject: [PATCH] [service] Validate pipeline type against component types (#9257) **Description:** This change adds another layer of validation to pipelines. It validates that all the components in a pipeline are of the same type as the pipeline. For example, if a `metrics` pipeline contains a `traces`-only receiver, the `otelcol validate -config ...` command will fail. **Link to tracking Issue:** Fixes #8007. **Testing:** Added unit test + existing tests are passing. **Documentation:** godoc. --------- Co-authored-by: Pablo Baeyens --- .chloggen/validate-pipeline-types.yaml | 25 ++++++ otelcol/collector.go | 26 +++++- otelcol/collector_test.go | 41 +++++++-- otelcol/command_components.go | 85 ++++++++++++------- otelcol/factories_test.go | 3 +- otelcol/testdata/components-output.yaml | 5 ++ .../otelcol-invalid-receiver-type.yaml | 18 ++++ receiver/receivertest/nop_receiver.go | 35 ++++++-- 8 files changed, 191 insertions(+), 47 deletions(-) create mode 100644 .chloggen/validate-pipeline-types.yaml create mode 100644 otelcol/testdata/otelcol-invalid-receiver-type.yaml diff --git a/.chloggen/validate-pipeline-types.yaml b/.chloggen/validate-pipeline-types.yaml new file mode 100644 index 00000000000..03146a53711 --- /dev/null +++ b/.chloggen/validate-pipeline-types.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Validate pipeline type against component types + +# One or more tracking issues or pull requests related to the change +issues: [8007] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/otelcol/collector.go b/otelcol/collector.go index a1500f5b81c..8e50749f449 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -236,7 +236,11 @@ func (col *Collector) DryRun(ctx context.Context) error { return fmt.Errorf("failed to get config: %w", err) } - return cfg.Validate() + if err := cfg.Validate(); err != nil { + return err + } + + return col.validatePipelineCfg(ctx, cfg, factories) } // Run starts the collector according to the given configuration, and waits for it to complete. @@ -314,3 +318,23 @@ func (col *Collector) shutdown(ctx context.Context) error { func (col *Collector) setCollectorState(state State) { col.state.Store(int32(state)) } + +// validatePipelineConfig validates that the components in a pipeline support the +// signal type of the pipeline. For example, this function will return an error if +// a metrics pipeline has non-metrics components. +func (col *Collector) validatePipelineCfg(ctx context.Context, cfg *Config, factories Factories) error { + set := service.Settings{ + Receivers: receiver.NewBuilder(cfg.Receivers, factories.Receivers), + Processors: processor.NewBuilder(cfg.Processors, factories.Processors), + Exporters: exporter.NewBuilder(cfg.Exporters, factories.Exporters), + Connectors: connector.NewBuilder(cfg.Connectors, factories.Connectors), + Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions), + } + + _, err := service.New(ctx, set, cfg.Service) + if err != nil { + return err + } + + return nil +} diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index c56d56fc728..94a728add09 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -421,16 +421,41 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) { } func TestCollectorDryRun(t *testing.T) { - // Load a bad config causing startup to fail - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}), + tests := map[string]struct { + settings CollectorSettings + expectedErr string + }{ + "invalid_processor": { + settings: CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}), + }, + expectedErr: `service::pipelines::traces: references processor "invalid" which is not configured`, + }, + "logs_receiver_traces_pipeline": { + settings: CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid-receiver-type.yaml")}), + }, + expectedErr: `failed to build pipelines: failed to create "nop_logs" receiver for data type "traces": telemetry type is not supported`, + }, } - col, err := NewCollector(set) - require.NoError(t, err) - require.Error(t, col.DryRun(context.Background())) + for name, test := range tests { + t.Run(name, func(t *testing.T) { + col, err := NewCollector(test.settings) + require.NoError(t, err) + + err = col.DryRun(context.Background()) + if test.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErr) + } + }) + } } func TestPassConfmapToServiceFailure(t *testing.T) { diff --git a/otelcol/command_components.go b/otelcol/command_components.go index 83dd4670375..da13d6779a7 100644 --- a/otelcol/command_components.go +++ b/otelcol/command_components.go @@ -5,11 +5,17 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( "fmt" + "sort" "github.com/spf13/cobra" "gopkg.in/yaml.v3" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" ) type componentWithStability struct { @@ -41,59 +47,59 @@ func newComponentsCommand(set CollectorSettings) *cobra.Command { } components := componentsOutput{} - for con := range factories.Connectors { + for _, con := range sortFactoriesByType[connector.Factory](factories.Connectors) { components.Connectors = append(components.Connectors, componentWithStability{ - Name: con, + Name: con.Type(), Stability: map[string]string{ - "logs-to-logs": factories.Connectors[con].LogsToLogsStability().String(), - "logs-to-metrics": factories.Connectors[con].LogsToMetricsStability().String(), - "logs-to-traces": factories.Connectors[con].LogsToTracesStability().String(), + "logs-to-logs": con.LogsToLogsStability().String(), + "logs-to-metrics": con.LogsToMetricsStability().String(), + "logs-to-traces": con.LogsToTracesStability().String(), - "metrics-to-logs": factories.Connectors[con].MetricsToLogsStability().String(), - "metrics-to-metrics": factories.Connectors[con].MetricsToMetricsStability().String(), - "metrics-to-traces": factories.Connectors[con].MetricsToTracesStability().String(), + "metrics-to-logs": con.MetricsToLogsStability().String(), + "metrics-to-metrics": con.MetricsToMetricsStability().String(), + "metrics-to-traces": con.MetricsToTracesStability().String(), - "traces-to-logs": factories.Connectors[con].TracesToLogsStability().String(), - "traces-to-metrics": factories.Connectors[con].TracesToMetricsStability().String(), - "traces-to-traces": factories.Connectors[con].TracesToTracesStability().String(), + "traces-to-logs": con.TracesToLogsStability().String(), + "traces-to-metrics": con.TracesToMetricsStability().String(), + "traces-to-traces": con.TracesToTracesStability().String(), }, }) } - for ext := range factories.Extensions { + for _, ext := range sortFactoriesByType[extension.Factory](factories.Extensions) { components.Extensions = append(components.Extensions, componentWithStability{ - Name: ext, + Name: ext.Type(), Stability: map[string]string{ - "extension": factories.Extensions[ext].ExtensionStability().String(), + "extension": ext.ExtensionStability().String(), }, }) } - for prs := range factories.Processors { + for _, prs := range sortFactoriesByType[processor.Factory](factories.Processors) { components.Processors = append(components.Processors, componentWithStability{ - Name: prs, + Name: prs.Type(), Stability: map[string]string{ - "logs": factories.Processors[prs].LogsProcessorStability().String(), - "metrics": factories.Processors[prs].MetricsProcessorStability().String(), - "traces": factories.Processors[prs].TracesProcessorStability().String(), + "logs": prs.LogsProcessorStability().String(), + "metrics": prs.MetricsProcessorStability().String(), + "traces": prs.TracesProcessorStability().String(), }, }) } - for rcv := range factories.Receivers { + for _, rcv := range sortFactoriesByType[receiver.Factory](factories.Receivers) { components.Receivers = append(components.Receivers, componentWithStability{ - Name: rcv, + Name: rcv.Type(), Stability: map[string]string{ - "logs": factories.Receivers[rcv].LogsReceiverStability().String(), - "metrics": factories.Receivers[rcv].MetricsReceiverStability().String(), - "traces": factories.Receivers[rcv].TracesReceiverStability().String(), + "logs": rcv.LogsReceiverStability().String(), + "metrics": rcv.MetricsReceiverStability().String(), + "traces": rcv.TracesReceiverStability().String(), }, }) } - for exp := range factories.Exporters { + for _, exp := range sortFactoriesByType[exporter.Factory](factories.Exporters) { components.Exporters = append(components.Exporters, componentWithStability{ - Name: exp, + Name: exp.Type(), Stability: map[string]string{ - "logs": factories.Exporters[exp].LogsExporterStability().String(), - "metrics": factories.Exporters[exp].MetricsExporterStability().String(), - "traces": factories.Exporters[exp].TracesExporterStability().String(), + "logs": exp.LogsExporterStability().String(), + "metrics": exp.MetricsExporterStability().String(), + "traces": exp.TracesExporterStability().String(), }, }) } @@ -107,3 +113,24 @@ func newComponentsCommand(set CollectorSettings) *cobra.Command { }, } } + +func sortFactoriesByType[T component.Factory](factories map[component.Type]T) []T { + // Gather component types (factories map keys) + componentTypes := make([]component.Type, 0, len(factories)) + for componentType := range factories { + componentTypes = append(componentTypes, componentType) + } + + // Sort component types as strings + sort.Slice(componentTypes, func(i, j int) bool { + return componentTypes[i].String() < componentTypes[j].String() + }) + + // Build and return list of factories, sorted by component types + sortedFactories := make([]T, 0, len(factories)) + for _, componentType := range componentTypes { + sortedFactories = append(sortedFactories, factories[componentType]) + } + + return sortedFactories +} diff --git a/otelcol/factories_test.go b/otelcol/factories_test.go index 645d59c7db2..965ea91844b 100644 --- a/otelcol/factories_test.go +++ b/otelcol/factories_test.go @@ -4,6 +4,7 @@ package otelcol import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/exporter" @@ -28,7 +29,7 @@ func nopFactories() (Factories, error) { return Factories{}, err } - if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory()); err != nil { + if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory(), receivertest.NewNopFactoryForType(component.DataTypeLogs)); err != nil { return Factories{}, err } diff --git a/otelcol/testdata/components-output.yaml b/otelcol/testdata/components-output.yaml index bfac28484ec..929ae01945b 100644 --- a/otelcol/testdata/components-output.yaml +++ b/otelcol/testdata/components-output.yaml @@ -8,6 +8,11 @@ receivers: logs: Stable metrics: Stable traces: Stable + - name: nop_logs + stability: + logs: Stable + metrics: Undefined + traces: Undefined processors: - name: nop stability: diff --git a/otelcol/testdata/otelcol-invalid-receiver-type.yaml b/otelcol/testdata/otelcol-invalid-receiver-type.yaml new file mode 100644 index 00000000000..5837810fcca --- /dev/null +++ b/otelcol/testdata/otelcol-invalid-receiver-type.yaml @@ -0,0 +1,18 @@ +receivers: + nop_logs: + +processors: + nop: + +exporters: + nop: + +service: + telemetry: + metrics: + address: localhost:8888 + pipelines: + traces: + receivers: [nop_logs] + processors: [nop] + exporters: [nop] diff --git a/receiver/receivertest/nop_receiver.go b/receiver/receivertest/nop_receiver.go index 024242a864a..e9cec06ca1b 100644 --- a/receiver/receivertest/nop_receiver.go +++ b/receiver/receivertest/nop_receiver.go @@ -14,27 +14,48 @@ import ( "go.opentelemetry.io/collector/receiver" ) -var componentType = component.MustNewType("nop") +var defaultComponentType = component.MustNewType("nop") // NewNopCreateSettings returns a new nop settings for Create*Receiver functions. func NewNopCreateSettings() receiver.CreateSettings { return receiver.CreateSettings{ - ID: component.NewIDWithName(componentType, uuid.NewString()), + ID: component.NewIDWithName(defaultComponentType, uuid.NewString()), TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), } } -// NewNopFactory returns a receiver.Factory that constructs nop receivers. +// NewNopFactory returns a receiver.Factory that constructs nop receivers supporting all data types. func NewNopFactory() receiver.Factory { return receiver.NewFactory( - componentType, + defaultComponentType, func() component.Config { return &nopConfig{} }, receiver.WithTraces(createTraces, component.StabilityLevelStable), receiver.WithMetrics(createMetrics, component.StabilityLevelStable), receiver.WithLogs(createLogs, component.StabilityLevelStable)) } +// NewNopFactoryForType returns a receiver.Factory that constructs nop receivers supporting only the +// given data type. +func NewNopFactoryForType(dataType component.DataType) receiver.Factory { + var factoryOpt receiver.FactoryOption + switch dataType { + case component.DataTypeTraces: + factoryOpt = receiver.WithTraces(createTraces, component.StabilityLevelStable) + case component.DataTypeMetrics: + factoryOpt = receiver.WithMetrics(createMetrics, component.StabilityLevelStable) + case component.DataTypeLogs: + factoryOpt = receiver.WithLogs(createLogs, component.StabilityLevelStable) + default: + panic("unsupported data type for creating nop receiver factory: " + dataType.String()) + } + + componentType := component.MustNewType(defaultComponentType.String() + "_" + dataType.String()) + return receiver.NewFactory(componentType, func() component.Config { return &nopConfig{} }, factoryOpt) +} + +type nopConfig struct{} + func createTraces(context.Context, receiver.CreateSettings, component.Config, consumer.Traces) (receiver.Traces, error) { return nopInstance, nil } @@ -47,8 +68,6 @@ func createLogs(context.Context, receiver.CreateSettings, component.Config, cons return nopInstance, nil } -type nopConfig struct{} - var nopInstance = &nopReceiver{} // nopReceiver acts as a receiver for testing purposes. @@ -61,6 +80,6 @@ type nopReceiver struct { func NewNopBuilder() *receiver.Builder { nopFactory := NewNopFactory() return receiver.NewBuilder( - map[component.ID]component.Config{component.NewID(componentType): nopFactory.CreateDefaultConfig()}, - map[component.Type]receiver.Factory{componentType: nopFactory}) + map[component.ID]component.Config{component.NewID(defaultComponentType): nopFactory.CreateDefaultConfig()}, + map[component.Type]receiver.Factory{defaultComponentType: nopFactory}) }