Skip to content

Commit

Permalink
[service] Validate pipeline type against component types (#9257)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

This change adds another layer of validation to pipelines. It validates
that all the components in a pipeline are of the same type as the
pipeline.

For example, if a `metrics` pipeline contains a `traces`-only receiver,
the `otelcol validate -config ...` command will fail.

**Link to tracking Issue:** 
Fixes #8007.

**Testing:** 
Added unit test + existing tests are passing.

**Documentation:**
godoc.

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
ycombinator and mx-psi authored Apr 10, 2024
1 parent cad5c63 commit de3ef01
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 47 deletions.
25 changes: 25 additions & 0 deletions .chloggen/validate-pipeline-types.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Validate pipeline type against component types

# One or more tracking issues or pull requests related to the change
issues: [8007]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
26 changes: 25 additions & 1 deletion otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,11 @@ func (col *Collector) DryRun(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

return cfg.Validate()
if err := cfg.Validate(); err != nil {
return err
}

return col.validatePipelineCfg(ctx, cfg, factories)
}

// Run starts the collector according to the given configuration, and waits for it to complete.
Expand Down Expand Up @@ -314,3 +318,23 @@ func (col *Collector) shutdown(ctx context.Context) error {
func (col *Collector) setCollectorState(state State) {
col.state.Store(int32(state))
}

// validatePipelineConfig validates that the components in a pipeline support the
// signal type of the pipeline. For example, this function will return an error if
// a metrics pipeline has non-metrics components.
func (col *Collector) validatePipelineCfg(ctx context.Context, cfg *Config, factories Factories) error {
set := service.Settings{
Receivers: receiver.NewBuilder(cfg.Receivers, factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(cfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions),
}

_, err := service.New(ctx, set, cfg.Service)
if err != nil {
return err
}

return nil
}
41 changes: 33 additions & 8 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,41 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
}

func TestCollectorDryRun(t *testing.T) {
// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
tests := map[string]struct {
settings CollectorSettings
expectedErr string
}{
"invalid_processor": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
},
expectedErr: `service::pipelines::traces: references processor "invalid" which is not configured`,
},
"logs_receiver_traces_pipeline": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid-receiver-type.yaml")}),
},
expectedErr: `failed to build pipelines: failed to create "nop_logs" receiver for data type "traces": telemetry type is not supported`,
},
}
col, err := NewCollector(set)
require.NoError(t, err)

require.Error(t, col.DryRun(context.Background()))
for name, test := range tests {
t.Run(name, func(t *testing.T) {
col, err := NewCollector(test.settings)
require.NoError(t, err)

err = col.DryRun(context.Background())
if test.expectedErr == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, test.expectedErr)
}
})
}
}

func TestPassConfmapToServiceFailure(t *testing.T) {
Expand Down
85 changes: 56 additions & 29 deletions otelcol/command_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"fmt"
"sort"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
)

type componentWithStability struct {
Expand Down Expand Up @@ -41,59 +47,59 @@ func newComponentsCommand(set CollectorSettings) *cobra.Command {
}

components := componentsOutput{}
for con := range factories.Connectors {
for _, con := range sortFactoriesByType[connector.Factory](factories.Connectors) {
components.Connectors = append(components.Connectors, componentWithStability{
Name: con,
Name: con.Type(),
Stability: map[string]string{
"logs-to-logs": factories.Connectors[con].LogsToLogsStability().String(),
"logs-to-metrics": factories.Connectors[con].LogsToMetricsStability().String(),
"logs-to-traces": factories.Connectors[con].LogsToTracesStability().String(),
"logs-to-logs": con.LogsToLogsStability().String(),
"logs-to-metrics": con.LogsToMetricsStability().String(),
"logs-to-traces": con.LogsToTracesStability().String(),

"metrics-to-logs": factories.Connectors[con].MetricsToLogsStability().String(),
"metrics-to-metrics": factories.Connectors[con].MetricsToMetricsStability().String(),
"metrics-to-traces": factories.Connectors[con].MetricsToTracesStability().String(),
"metrics-to-logs": con.MetricsToLogsStability().String(),
"metrics-to-metrics": con.MetricsToMetricsStability().String(),
"metrics-to-traces": con.MetricsToTracesStability().String(),

"traces-to-logs": factories.Connectors[con].TracesToLogsStability().String(),
"traces-to-metrics": factories.Connectors[con].TracesToMetricsStability().String(),
"traces-to-traces": factories.Connectors[con].TracesToTracesStability().String(),
"traces-to-logs": con.TracesToLogsStability().String(),
"traces-to-metrics": con.TracesToMetricsStability().String(),
"traces-to-traces": con.TracesToTracesStability().String(),
},
})
}
for ext := range factories.Extensions {
for _, ext := range sortFactoriesByType[extension.Factory](factories.Extensions) {
components.Extensions = append(components.Extensions, componentWithStability{
Name: ext,
Name: ext.Type(),
Stability: map[string]string{
"extension": factories.Extensions[ext].ExtensionStability().String(),
"extension": ext.ExtensionStability().String(),
},
})
}
for prs := range factories.Processors {
for _, prs := range sortFactoriesByType[processor.Factory](factories.Processors) {
components.Processors = append(components.Processors, componentWithStability{
Name: prs,
Name: prs.Type(),
Stability: map[string]string{
"logs": factories.Processors[prs].LogsProcessorStability().String(),
"metrics": factories.Processors[prs].MetricsProcessorStability().String(),
"traces": factories.Processors[prs].TracesProcessorStability().String(),
"logs": prs.LogsProcessorStability().String(),
"metrics": prs.MetricsProcessorStability().String(),
"traces": prs.TracesProcessorStability().String(),
},
})
}
for rcv := range factories.Receivers {
for _, rcv := range sortFactoriesByType[receiver.Factory](factories.Receivers) {
components.Receivers = append(components.Receivers, componentWithStability{
Name: rcv,
Name: rcv.Type(),
Stability: map[string]string{
"logs": factories.Receivers[rcv].LogsReceiverStability().String(),
"metrics": factories.Receivers[rcv].MetricsReceiverStability().String(),
"traces": factories.Receivers[rcv].TracesReceiverStability().String(),
"logs": rcv.LogsReceiverStability().String(),
"metrics": rcv.MetricsReceiverStability().String(),
"traces": rcv.TracesReceiverStability().String(),
},
})
}
for exp := range factories.Exporters {
for _, exp := range sortFactoriesByType[exporter.Factory](factories.Exporters) {
components.Exporters = append(components.Exporters, componentWithStability{
Name: exp,
Name: exp.Type(),
Stability: map[string]string{
"logs": factories.Exporters[exp].LogsExporterStability().String(),
"metrics": factories.Exporters[exp].MetricsExporterStability().String(),
"traces": factories.Exporters[exp].TracesExporterStability().String(),
"logs": exp.LogsExporterStability().String(),
"metrics": exp.MetricsExporterStability().String(),
"traces": exp.TracesExporterStability().String(),
},
})
}
Expand All @@ -107,3 +113,24 @@ func newComponentsCommand(set CollectorSettings) *cobra.Command {
},
}
}

func sortFactoriesByType[T component.Factory](factories map[component.Type]T) []T {
// Gather component types (factories map keys)
componentTypes := make([]component.Type, 0, len(factories))
for componentType := range factories {
componentTypes = append(componentTypes, componentType)
}

// Sort component types as strings
sort.Slice(componentTypes, func(i, j int) bool {
return componentTypes[i].String() < componentTypes[j].String()
})

// Build and return list of factories, sorted by component types
sortedFactories := make([]T, 0, len(factories))
for _, componentType := range componentTypes {
sortedFactories = append(sortedFactories, factories[componentType])
}

return sortedFactories
}
3 changes: 2 additions & 1 deletion otelcol/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package otelcol

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -28,7 +29,7 @@ func nopFactories() (Factories, error) {
return Factories{}, err
}

if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory()); err != nil {
if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory(), receivertest.NewNopFactoryForType(component.DataTypeLogs)); err != nil {
return Factories{}, err
}

Expand Down
5 changes: 5 additions & 0 deletions otelcol/testdata/components-output.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ receivers:
logs: Stable
metrics: Stable
traces: Stable
- name: nop_logs
stability:
logs: Stable
metrics: Undefined
traces: Undefined
processors:
- name: nop
stability:
Expand Down
18 changes: 18 additions & 0 deletions otelcol/testdata/otelcol-invalid-receiver-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
receivers:
nop_logs:

processors:
nop:

exporters:
nop:

service:
telemetry:
metrics:
address: localhost:8888
pipelines:
traces:
receivers: [nop_logs]
processors: [nop]
exporters: [nop]
35 changes: 27 additions & 8 deletions receiver/receivertest/nop_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,48 @@ import (
"go.opentelemetry.io/collector/receiver"
)

var componentType = component.MustNewType("nop")
var defaultComponentType = component.MustNewType("nop")

// NewNopCreateSettings returns a new nop settings for Create*Receiver functions.
func NewNopCreateSettings() receiver.CreateSettings {
return receiver.CreateSettings{
ID: component.NewIDWithName(componentType, uuid.NewString()),
ID: component.NewIDWithName(defaultComponentType, uuid.NewString()),
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

// NewNopFactory returns a receiver.Factory that constructs nop receivers.
// NewNopFactory returns a receiver.Factory that constructs nop receivers supporting all data types.
func NewNopFactory() receiver.Factory {
return receiver.NewFactory(
componentType,
defaultComponentType,
func() component.Config { return &nopConfig{} },
receiver.WithTraces(createTraces, component.StabilityLevelStable),
receiver.WithMetrics(createMetrics, component.StabilityLevelStable),
receiver.WithLogs(createLogs, component.StabilityLevelStable))
}

// NewNopFactoryForType returns a receiver.Factory that constructs nop receivers supporting only the
// given data type.
func NewNopFactoryForType(dataType component.DataType) receiver.Factory {
var factoryOpt receiver.FactoryOption
switch dataType {
case component.DataTypeTraces:
factoryOpt = receiver.WithTraces(createTraces, component.StabilityLevelStable)
case component.DataTypeMetrics:
factoryOpt = receiver.WithMetrics(createMetrics, component.StabilityLevelStable)
case component.DataTypeLogs:
factoryOpt = receiver.WithLogs(createLogs, component.StabilityLevelStable)
default:
panic("unsupported data type for creating nop receiver factory: " + dataType.String())
}

componentType := component.MustNewType(defaultComponentType.String() + "_" + dataType.String())
return receiver.NewFactory(componentType, func() component.Config { return &nopConfig{} }, factoryOpt)
}

type nopConfig struct{}

func createTraces(context.Context, receiver.CreateSettings, component.Config, consumer.Traces) (receiver.Traces, error) {
return nopInstance, nil
}
Expand All @@ -47,8 +68,6 @@ func createLogs(context.Context, receiver.CreateSettings, component.Config, cons
return nopInstance, nil
}

type nopConfig struct{}

var nopInstance = &nopReceiver{}

// nopReceiver acts as a receiver for testing purposes.
Expand All @@ -61,6 +80,6 @@ type nopReceiver struct {
func NewNopBuilder() *receiver.Builder {
nopFactory := NewNopFactory()
return receiver.NewBuilder(
map[component.ID]component.Config{component.NewID(componentType): nopFactory.CreateDefaultConfig()},
map[component.Type]receiver.Factory{componentType: nopFactory})
map[component.ID]component.Config{component.NewID(defaultComponentType): nopFactory.CreateDefaultConfig()},
map[component.Type]receiver.Factory{defaultComponentType: nopFactory})
}

0 comments on commit de3ef01

Please sign in to comment.